{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "reported-geometry",
   "metadata": {},
   "source": [
    "In this example, we will use tensorflow v1 (version 1.15) to create a simple MLP model, and transfer the application to Cluster Serving step by step.\n",
    "\n",
    "This tutorial is recommended for Tensorflow v1 user only. If you are not Tensorflow v1 user, the keras tutorial [here](#keras-to-cluster-serving-example.ipynb) is more recommended."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "athletic-trance",
   "metadata": {},
   "source": [
    "### Original Tensorflow v1 Application"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "olive-dutch",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'1.15.0'"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "tf.__version__"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "vertical-recall",
   "metadata": {},
   "source": [
    "We first define the Tensorflow graph, and create some data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "tropical-clinton",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "WARNING:tensorflow:From <ipython-input-2-853d23643c61>:24: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "Use tf.where in 2.0, which has the same broadcast rule as np.where\n"
     ]
    }
   ],
   "source": [
    "g = tf.Graph()\n",
    "with g.as_default():\n",
    "   \n",
    "    # Graph Inputs\n",
    "    features = tf.placeholder(dtype=tf.float32, \n",
    "                              shape=[None, 2], name='features')\n",
    "    targets = tf.placeholder(dtype=tf.float32, \n",
    "                             shape=[None, 1], name='targets')\n",
    "\n",
    "    # Model Parameters\n",
    "    weights = tf.Variable(tf.zeros(shape=[2, 1], \n",
    "                          dtype=tf.float32), name='weights')\n",
    "    bias = tf.Variable([[0.]], dtype=tf.float32, name='bias')\n",
    "    \n",
    "\n",
    "    \n",
    "    # Forward Pass\n",
    "    linear = tf.add(tf.matmul(features, weights), bias, name='linear')\n",
    "    ones = tf.ones(shape=tf.shape(linear)) \n",
    "    zeros = tf.zeros(shape=tf.shape(linear))\n",
    "    prediction = tf.where(condition=tf.less(linear, 0.),\n",
    "                          x=zeros, \n",
    "                          y=ones, \n",
    "                          name='prediction')\n",
    "    \n",
    "    # Backward Pass\n",
    "    errors = targets - prediction\n",
    "    weight_update = tf.assign_add(weights, \n",
    "                                  tf.reshape(errors * features, (2, 1)),\n",
    "                                  name='weight_update')\n",
    "    bias_update = tf.assign_add(bias, errors,\n",
    "                                name='bias_update')\n",
    "    \n",
    "    train = tf.group(weight_update, bias_update, name='train')\n",
    "    \n",
    "    saver = tf.train.Saver(name='saver')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "legislative-boutique",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "((3, 2), (3,))"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import numpy as np\n",
    "x_train, y_train = np.array([[1,2],[3,4],[1,3]]), np.array([1,2,1])\n",
    "x_train.shape, y_train.shape"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "coated-grill",
   "metadata": {},
   "source": [
    "### Export TensorFlow SavedModel\n",
    "Then, we train the graph and in the `with tf.Session`, we save the graph to SavedModel. The detailed code is following, and we could see the prediction result is `[1]` with input `[1,2]`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "detailed-message",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model parameters:\n",
      "\n",
      "Weights:\n",
      " [[15.]\n",
      " [20.]]\n",
      "Bias: [[5.]]\n",
      "[[1.]\n",
      " [1.]\n",
      " [1.]]\n",
      "WARNING:tensorflow:From <ipython-input-5-399ffbde562b>:26: simple_save (from tensorflow.python.saved_model.simple_save) is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.simple_save.\n",
      "WARNING:tensorflow:From /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages/tensorflow_core/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.\n",
      "INFO:tensorflow:Assets added to graph.\n",
      "INFO:tensorflow:No assets to write.\n",
      "INFO:tensorflow:SavedModel written to: /tmp/mlp_tf1/saved_model.pb\n"
     ]
    }
   ],
   "source": [
    "with tf.Session(graph=g) as sess:\n",
    "    \n",
    "    sess.run(tf.global_variables_initializer())\n",
    "    \n",
    "    for epoch in range(5):\n",
    "        for example, target in zip(x_train, y_train):\n",
    "            feed_dict = {'features:0': example.reshape(-1, 2),\n",
    "                         'targets:0': target.reshape(-1, 1)}\n",
    "            _ = sess.run(['train'], feed_dict=feed_dict)\n",
    "\n",
    "\n",
    "    w, b = sess.run(['weights:0', 'bias:0'])    \n",
    "    print('Model parameters:\\n')\n",
    "    print('Weights:\\n', w)\n",
    "    print('Bias:', b)\n",
    "\n",
    "    saver.save(sess, save_path='perceptron')\n",
    "    \n",
    "    pred = sess.run('prediction:0', feed_dict={features: x_train})\n",
    "    print(pred)\n",
    "    \n",
    "    # in this session, save the model to savedModel format\n",
    "    inputs = dict([(features.name, features)])\n",
    "    outputs = dict([(prediction.name, prediction)])\n",
    "    inputs, outputs\n",
    "    tf.saved_model.simple_save(sess, \"/tmp/mlp_tf1\", inputs, outputs)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "consolidated-newport",
   "metadata": {},
   "source": [
    "### Deploy Cluster Serving\n",
    "After model prepared, we start to deploy it on Cluster Serving.\n",
    "\n",
    "First install Cluster Serving"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "inner-texas",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Collecting bigdl-serving\n",
      "Requirement already satisfied: httpx in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (0.17.1)\n",
      "Requirement already satisfied: pyarrow in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (3.0.0)\n",
      "Requirement already satisfied: pyyaml in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (5.4.1)\n",
      "Requirement already satisfied: redis in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (3.5.3)\n",
      "Requirement already satisfied: opencv-python in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (4.5.1.48)\n",
      "Requirement already satisfied: certifi in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (2020.12.5)\n",
      "Requirement already satisfied: httpcore<0.13,>=0.12.1 in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (0.12.3)\n",
      "Requirement already satisfied: rfc3986[idna2008]<2,>=1.3 in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (1.4.0)\n",
      "Requirement already satisfied: sniffio in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (1.2.0)\n",
      "Requirement already satisfied: h11==0.* in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpcore<0.13,>=0.12.1->httpx->bigdl-serving) (0.12.0)\n",
      "Requirement already satisfied: idna in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from rfc3986[idna2008]<2,>=1.3->httpx->bigdl-serving) (3.1)\n",
      "Requirement already satisfied: numpy>=1.14.5 in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from opencv-python->bigdl-serving) (1.20.1)\n",
      "Installing collected packages: bigdl-serving\n",
      "Successfully installed bigdl-serving-0.9.0\n"
     ]
    }
   ],
   "source": [
    "! pip install bigdl-serving"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "working-terrorism",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Trying to find config file in  /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages/bigdl/conf/config.yaml\r\n",
      "Config file found in pip package, copying...\r\n",
      "Config file ready.\r\n",
      "Cluster Serving has been properly set up.\r\n",
      "You did not specify BIGDL_VERSION, will download 0.9.0\r\n",
      "BIGDL_VERSION is 0.9.0\r\n",
      "BIGDL_VERSION is 0.12.1\r\n",
      "SPARK_VERSION is 2.4.3\r\n",
      "2.4\r\n",
      "You are installing Cluster Serving by pip, downloading...\r\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "! mkdir cluster-serving\n",
    "os.chdir('cluster-serving')\n",
    "! cluster-serving-init"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "excited-exception",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "  2800K .......... .......... .......... .......... ..........  0% 11.8M 19m20s\r\n",
      "  2850K .......... .......... .......... .......... ..........  0% 11.3M 19m1s\r\n",
      "  2900K .......... .......... .......... .......... ..........  0% 8.60M 18m43s\r\n",
      "  2950K .......... .......... .......... .......... ..........  0% 11.9M 18m25s\r\n",
      "  3000K .......... .......... .......... .......... ..........  0% 11.8M 18m7s\r\n",
      "  3050K .......... .......... .......... .......... ..........  0%  674K 18m4s\r\n",
      "  3100K .......... .......... .......... .......... ..........  0%  418K 18m9s\r\n",
      "  3150K .......... .......... .......... .......... ..........  0% 1.05M 18m0s\r\n",
      "  3200K .......... .......... .......... .......... ..........  0%  750K 17m56s\r\n",
      "  3250K .......... .......... .......... ...."
     ]
    }
   ],
   "source": [
    "! tail wget-log"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "casual-premium",
   "metadata": {},
   "outputs": [],
   "source": [
    "# if you encounter slow download issue like above, you can just use following command to download\n",
    "# ! wget https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-spark_2.4.3/0.9.0/bigdl-spark_2.4.3-0.9.0-serving.jar\n",
    "\n",
    "# if you are using wget to download, or get \"bigdl-xxx-serving.jar\" after \"ls\", please call mv *serving.jar bigdl.jar after downloaded."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "ruled-bermuda",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "bigdl-spark_2.4.3-0.9.0-serving.jar  config.yaml  wget-log\r\n"
     ]
    }
   ],
   "source": [
    "# After initialization finished, check the directory\n",
    "! ls"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "computational-rehabilitation",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Call mv *serving.jar bigdl.jar as mentioned above\n",
    "! mv *serving.jar bigdl.jar"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "personal-central",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "config.yaml  wget-log  bigdl.jar\r\n"
     ]
    }
   ],
   "source": [
    "! ls"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "combined-stability",
   "metadata": {},
   "source": [
    "We config the model path in `config.yaml` to following (the detail of config is at [Cluster Serving Configuration](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#2-configuration))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "received-hayes",
   "metadata": {},
   "outputs": [],
   "source": [
    "## BigDL Cluster Serving\n",
    "\n",
    "model:\n",
    "  # model path must be provided\n",
    "  path: /tmp/mlp_tf1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "satellite-honey",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "## BigDL Cluster Serving\r\n",
      "\r\n",
      "model:\r\n",
      "  # model path must be provided\r\n",
      "  path: /tmp/mlp_tf1\r\n",
      "  # name, default is serving_stream, you need to specify if running multiple servings\r\n",
      "  name:\r\n",
      "data:\r\n",
      "  # default, localhost:6379\r\n",
      "  src:\r\n"
     ]
    }
   ],
   "source": [
    "! head config.yaml"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "planned-hometown",
   "metadata": {},
   "source": [
    "### Start Cluster Serving\n",
    "\n",
    "Cluster Serving requires Flink and Redis installed, and corresponded environment variables set, check [Cluster Serving Installation Guide](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#1-installation) for detail.\n",
    "\n",
    "Flink cluster should start before Cluster Serving starts, if Flink cluster is not started, call following to start a local Flink cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "antique-melbourne",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting cluster.\n",
      "Starting standalonesession daemon on host user-PC.\n",
      "Starting taskexecutor daemon on host user-PC.\n"
     ]
    }
   ],
   "source": [
    "! $FLINK_HOME/bin/start-cluster.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "interested-bench",
   "metadata": {},
   "source": [
    "After configuration, start Cluster Serving by `cluster-serving-start` (the detail is at [Cluster Serving Programming Guide](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#3-launching-service))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "modern-monster",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "model_path=\"/tmp/mlp_tf1\"\n",
      "redis_timeout=\"5000\"\n",
      "Redis maxmemory is not set, using default value 8G\n",
      "redis server started, please check log in redis.log\n",
      "OK\n",
      "OK\n",
      "OK\n",
      "redis config maxmemory set to 8G\n",
      "OK\n",
      "OK\n",
      "Starting new Cluster Serving job.\n",
      "Cluster Serving job submitted, check log in log-cluster_serving-serving_stream.txt\n",
      "To list Cluster Serving job status, use cluster-serving-cli list\n",
      "{maxmem=null, timeout=5000}timeout getted: 5000\n"
     ]
    }
   ],
   "source": [
    "! cluster-serving-start"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "improved-rough",
   "metadata": {},
   "source": [
    "### Prediction using Cluster Serving\n",
    "Next we start Cluster Serving code at python client."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "immune-madness",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "redis group exist, will not create new one\n",
      "redis group exist, will not create new one\n",
      "Write to Redis successful\n",
      "redis group exist, will not create new one\n",
      "Write to Redis successful\n"
     ]
    }
   ],
   "source": [
    "from bigdl.serving.client import InputQueue, OutputQueue\n",
    "input_queue = InputQueue()\n",
    "# Use async api to put and get, you have pass a name arg and use the name to get\n",
    "arr = np.array([1,2])\n",
    "input_queue.enqueue('my-input', t=arr)\n",
    "output_queue = OutputQueue()\n",
    "prediction = output_queue.query('my-input')\n",
    "# Use sync api to predict, this will block until the result is get or timeout\n",
    "prediction = input_queue.predict(arr)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "signal-attention",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([1.], dtype=float32)"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "prediction"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "suitable-selection",
   "metadata": {},
   "source": [
    "The `prediction` result would be the same as using Tensorflow.\n",
    "\n",
    "This is the end of this tutorial. If you have any question, you could raise an issue at [Analytics bigdl Github](https://github.com/intel-analytics/bigdl/issues)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
